- Model Monitoring
- Overview
- Monitoring deployed machine learning models ensures they continue to perform well in production, detecting data drift, concept drift, and performance degradation.
- When to Use
- When models are deployed in production environments serving real users
- When detecting data drift or concept drift in input features
- When tracking model performance metrics over time
- When ensuring model reliability, accuracy, and operational health
- When implementing ML observability and alerting systems
- When establishing thresholds for model retraining or intervention
- Monitoring Components
- Performance Metrics
-
- Accuracy, latency, throughput
- Data Drift
-
- Distribution changes in input features
- Concept Drift
-
- Changes in target variable relationships
- Output Drift
-
- Changes in prediction distribution
- Feature Drift
-
- Individual feature distribution changes
- Anomaly Detection
-
- Unusual samples in production
- Monitoring Tools
- Prometheus
-
- Metrics collection and storage
- Grafana
-
- Visualization and dashboarding
- MLflow
-
- Model tracking and registry
- TensorFlow Data Validation
-
- Data statistics
- Evidently
-
- Drift detection and monitoring
- Great Expectations
- Data quality assertions Python Implementation import numpy as np import pandas as pd import matplotlib . pyplot as plt from sklearn . ensemble import RandomForestClassifier from sklearn . preprocessing import StandardScaler from sklearn . metrics import accuracy_score , precision_score , recall_score , f1_score from scipy import stats import json from datetime import datetime , timedelta import logging logging . basicConfig ( level = logging . INFO ) logger = logging . getLogger ( name ) print ( "=== 1. Production Monitoring System ===" ) class ModelMonitoringSystem : def init ( self , model , scaler , baseline_data , baseline_targets ) : self . model = model self . scaler = scaler self . baseline_data = baseline_data self . baseline_targets = baseline_targets self . baseline_mean = baseline_data . mean ( axis = 0 ) self . baseline_std = baseline_data . std ( axis = 0 ) self . baseline_predictions = model . predict ( baseline_data ) self . metrics_history = [ ] self . drift_alerts = [ ] self . performance_history = [ ] def log_predictions ( self , X , y_true , y_pred ) : """Log predictions and compute metrics""" timestamp = datetime . now ( )
Compute metrics
accuracy
accuracy_score ( y_true , y_pred ) precision = precision_score ( y_true , y_pred , average = 'weighted' , zero_division = 0 ) recall = recall_score ( y_true , y_pred , average = 'weighted' , zero_division = 0 ) f1 = f1_score ( y_true , y_pred , average = 'weighted' , zero_division = 0 ) metric_record = { 'timestamp' : timestamp , 'accuracy' : accuracy , 'precision' : precision , 'recall' : recall , 'f1' : f1 , 'n_samples' : len ( X ) } self . metrics_history . append ( metric_record ) return metric_record def detect_data_drift ( self , X_new ) : """Detect data drift using Kolmogorov-Smirnov test""" drift_detected = False drift_features = [ ] for feature_idx in range ( X_new . shape [ 1 ] ) : baseline_feature = self . baseline_data [ : , feature_idx ] new_feature = X_new [ : , feature_idx ]
KS Test
ks_statistic , p_value = stats . ks_2samp ( baseline_feature , new_feature ) if p_value < 0.05 :
Significant drift detected
drift_detected
True drift_features . append ( { 'feature_index' : feature_idx , 'ks_statistic' : float ( ks_statistic ) , 'p_value' : float ( p_value ) } ) if drift_detected : alert = { 'timestamp' : datetime . now ( ) , 'type' : 'data_drift' , 'severity' : 'high' , 'drifted_features' : drift_features , 'n_drifted' : len ( drift_features ) } self . drift_alerts . append ( alert ) logger . warning ( f"Data drift detected in { len ( drift_features ) } features" ) return drift_detected , drift_features def detect_output_drift ( self , y_pred_new ) : """Detect drift in model predictions""" baseline_pred_dist = pd . Series ( self . baseline_predictions ) . value_counts ( normalize = True ) new_pred_dist = pd . Series ( y_pred_new ) . value_counts ( normalize = True )
Compare distributions
classes
set ( baseline_pred_dist . index ) | set ( new_pred_dist . index ) chi2_stat = 0 for cls in classes : exp = baseline_pred_dist . get ( cls , 0.01 ) obs = new_pred_dist . get ( cls , 0.01 ) chi2_stat += ( obs - exp ) ** 2 / max ( exp , 0.01 ) p_value = 1 - stats . chi2 . cdf ( chi2_stat , len ( classes ) - 1 ) if p_value < 0.05 : alert = { 'timestamp' : datetime . now ( ) , 'type' : 'output_drift' , 'severity' : 'medium' , 'chi2_statistic' : float ( chi2_stat ) , 'p_value' : float ( p_value ) } self . drift_alerts . append ( alert ) logger . warning ( "Output drift detected in predictions" ) return True return False def detect_performance_degradation ( self , y_true , y_pred ) : """Detect if model performance has degraded""" current_accuracy = accuracy_score ( y_true , y_pred ) baseline_accuracy = accuracy_score ( self . baseline_targets , self . baseline_predictions ) degradation_threshold = 0.05
5% drop
degradation
baseline_accuracy
current_accuracy if degradation
degradation_threshold : alert = { 'timestamp' : datetime . now ( ) , 'type' : 'performance_degradation' , 'severity' : 'critical' , 'baseline_accuracy' : float ( baseline_accuracy ) , 'current_accuracy' : float ( current_accuracy ) , 'degradation' : float ( degradation ) } self . drift_alerts . append ( alert ) logger . error ( "Performance degradation detected" ) return True return False def get_monitoring_report ( self ) : """Generate monitoring report""" if not self . metrics_history : return { } metrics_df = pd . DataFrame ( self . metrics_history ) return { 'monitoring_period' : { 'start' : self . metrics_history [ 0 ] [ 'timestamp' ] . isoformat ( ) , 'end' : self . metrics_history [ - 1 ] [ 'timestamp' ] . isoformat ( ) , 'n_batches' : len ( self . metrics_history ) } , 'performance_summary' : { 'avg_accuracy' : float ( metrics_df [ 'accuracy' ] . mean ( ) ) , 'min_accuracy' : float ( metrics_df [ 'accuracy' ] . min ( ) ) , 'max_accuracy' : float ( metrics_df [ 'accuracy' ] . max ( ) ) , 'std_accuracy' : float ( metrics_df [ 'accuracy' ] . std ( ) ) , 'avg_f1' : float ( metrics_df [ 'f1' ] . mean ( ) ) } , 'drift_summary' : { 'total_alerts' : len ( self . drift_alerts ) , 'data_drift_alerts' : sum ( 1 for a in self . drift_alerts if a [ 'type' ] == 'data_drift' ) , 'output_drift_alerts' : sum ( 1 for a in self . drift_alerts if a [ 'type' ] == 'output_drift' ) , 'performance_alerts' : sum ( 1 for a in self . drift_alerts if a [ 'type' ] == 'performance_degradation' ) } , 'alerts' : self . drift_alerts [ - 10 : ]
Last 10 alerts
} print ( "Monitoring system initialized" )
2. Create baseline model
print ( "\n=== 2. Train Baseline Model ===" ) from sklearn . datasets import make_classification
Create baseline data
X_baseline , y_baseline = make_classification ( n_samples = 1000 , n_features = 20 , n_informative = 15 , random_state = 42 ) scaler = StandardScaler ( ) X_baseline_scaled = scaler . fit_transform ( X_baseline ) model = RandomForestClassifier ( n_estimators = 100 , random_state = 42 ) model . fit ( X_baseline_scaled , y_baseline ) print ( f"Baseline model trained on { len ( X_baseline ) } samples" )
3. Initialize monitoring
monitor
ModelMonitoringSystem ( model , scaler , X_baseline_scaled , y_baseline )
4. Simulate production data and monitoring
print ( "\n=== 3. Production Monitoring Simulation ===" )
Simulate normal production data
X_prod_normal
np . random . randn ( 500 , 20 ) * 0.5 X_prod_normal_scaled = scaler . transform ( X_prod_normal ) y_pred_normal = model . predict ( X_prod_normal_scaled ) y_true_normal = np . random . randint ( 0 , 2 , 500 ) metrics_normal = monitor . log_predictions ( X_prod_normal_scaled , y_true_normal , y_pred_normal ) print ( f"Normal production batch - Accuracy: { metrics_normal [ 'accuracy' ] : .4f } " )
Simulate drifted data
X_prod_drift
np . random . randn ( 500 , 20 ) * 2.0
Different distribution
X_prod_drift_scaled
scaler . transform ( X_prod_drift ) y_pred_drift = model . predict ( X_prod_drift_scaled ) y_true_drift = np . random . randint ( 0 , 2 , 500 ) metrics_drift = monitor . log_predictions ( X_prod_drift_scaled , y_true_drift , y_pred_drift ) drift_detected , drift_features = monitor . detect_data_drift ( X_prod_drift_scaled ) print ( f"Drifted production batch - Accuracy: { metrics_drift [ 'accuracy' ] : .4f } " ) print ( f"Data drift detected: { drift_detected } " )
Check performance degradation
perf_degradation
monitor . detect_performance_degradation ( y_true_drift , y_pred_drift ) print ( f"Performance degradation: { perf_degradation } " )
5. Prometheus metrics export
print ( "\n=== 4. Prometheus Metrics Format ===" ) prometheus_metrics = f"""
HELP model_accuracy Model accuracy score
TYPE model_accuracy gauge
model_accuracy { "{timestamp='2024-01-01'}" } { metrics_normal [ 'accuracy' ] : .4f }
HELP model_f1_score Model F1 score
TYPE model_f1_score gauge
model_f1_score { "{timestamp='2024-01-01'}" } { metrics_normal [ 'f1' ] : .4f }
HELP model_predictions_total Total predictions made
TYPE model_predictions_total counter
model_predictions_total 5000
HELP model_drift_detected Data drift detection flag
TYPE model_drift_detected gauge
model_drift_detected { int ( drift_detected ) }
HELP model_latency_seconds Prediction latency in seconds
TYPE model_latency_seconds histogram
model_latency_seconds_bucket{{le="0.01"}} 100 model_latency_seconds_bucket{{le="0.05"}} 450 model_latency_seconds_bucket{{le="0.1"}} 500 """ print ( "Prometheus metrics:" ) print ( prometheus_metrics )
6. Grafana dashboard JSON
print ( "\n=== 5. Grafana Dashboard Configuration ===" ) grafana_dashboard = { 'title' : 'ML Model Monitoring Dashboard' , 'panels' : [ { 'title' : 'Model Accuracy' , 'targets' : [ { 'metric' : 'model_accuracy' } ] , 'type' : 'graph' } , { 'title' : 'Data Drift Alerts' , 'targets' : [ { 'metric' : 'model_drift_detected' } ] , 'type' : 'stat' } , { 'title' : 'Prediction Latency' , 'targets' : [ { 'metric' : 'model_latency_seconds' } ] , 'type' : 'graph' } , { 'title' : 'Feature Distributions' , 'targets' : [ { 'metric' : 'feature_distribution_change' } ] , 'type' : 'heatmap' } ] } print ( "Grafana dashboard configured with 4 panels" )
7. Visualization
print ( "\n=== 6. Monitoring Visualization ===" ) fig , axes = plt . subplots ( 2 , 2 , figsize = ( 14 , 10 ) )
Performance over time
metrics_df
pd . DataFrame ( monitor . metrics_history ) axes [ 0 , 0 ] . plot ( range ( len ( metrics_df ) ) , metrics_df [ 'accuracy' ] , marker = 'o' , label = 'Accuracy' ) axes [ 0 , 0 ] . axhline ( y = metrics_df [ 'accuracy' ] . mean ( ) , color = 'r' , linestyle = '--' , label = 'Mean' ) axes [ 0 , 0 ] . set_xlabel ( 'Batch' ) axes [ 0 , 0 ] . set_ylabel ( 'Accuracy' ) axes [ 0 , 0 ] . set_title ( 'Model Accuracy Over Time' ) axes [ 0 , 0 ] . legend ( ) axes [ 0 , 0 ] . grid ( True , alpha = 0.3 )
Precision, Recall, F1
axes [ 0 , 1 ] . plot ( range ( len ( metrics_df ) ) , metrics_df [ 'precision' ] , label = 'Precision' , marker = 's' ) axes [ 0 , 1 ] . plot ( range ( len ( metrics_df ) ) , metrics_df [ 'recall' ] , label = 'Recall' , marker = '^' ) axes [ 0 , 1 ] . plot ( range ( len ( metrics_df ) ) , metrics_df [ 'f1' ] , label = 'F1' , marker = 'd' ) axes [ 0 , 1 ] . set_xlabel ( 'Batch' ) axes [ 0 , 1 ] . set_ylabel ( 'Score' ) axes [ 0 , 1 ] . set_title ( 'Performance Metrics Over Time' ) axes [ 0 , 1 ] . legend ( ) axes [ 0 , 1 ] . grid ( True , alpha = 0.3 )
Feature distributions (baseline vs current)
feature_stats
pd . DataFrame ( { 'Baseline Mean' : np . mean ( X_baseline , axis = 0 ) [ : 10 ] , 'Current Mean' : np . mean ( X_prod_drift , axis = 0 ) [ : 10 ] } ) axes [ 1 , 0 ] . bar ( range ( len ( feature_stats ) ) , feature_stats [ 'Baseline Mean' ] , alpha = 0.7 , label = 'Baseline' ) axes [ 1 , 0 ] . bar ( range ( len ( feature_stats ) ) , feature_stats [ 'Current Mean' ] , alpha = 0.7 , label = 'Current' ) axes [ 1 , 0 ] . set_xlabel ( 'Feature' ) axes [ 1 , 0 ] . set_ylabel ( 'Mean Value' ) axes [ 1 , 0 ] . set_title ( 'Feature Distribution Drift (First 10)' ) axes [ 1 , 0 ] . legend ( ) axes [ 1 , 0 ] . grid ( True , alpha = 0.3 , axis = 'y' )
Alerts over time
alert_types
[ a [ 'type' ] for a in monitor . drift_alerts ] alert_counts = pd . Series ( alert_types ) . value_counts ( ) axes [ 1 , 1 ] . barh ( alert_counts . index , alert_counts . values , color = [ 'red' , 'orange' , 'yellow' ] ) axes [ 1 , 1 ] . set_xlabel ( 'Count' ) axes [ 1 , 1 ] . set_title ( 'Alert Types Distribution' ) axes [ 1 , 1 ] . grid ( True , alpha = 0.3 , axis = 'x' ) plt . tight_layout ( ) plt . savefig ( 'model_monitoring_dashboard.png' , dpi = 100 , bbox_inches = 'tight' ) print ( "\nMonitoring dashboard saved as 'model_monitoring_dashboard.png'" )
8. Report generation
report
- monitor
- .
- get_monitoring_report
- (
- )
- (
- "\n=== 7. Monitoring Report ==="
- )
- (
- json
- .
- dumps
- (
- report
- ,
- indent
- =
- 2
- ,
- default
- =
- str
- )
- )
- (
- "\nModel monitoring system setup completed!"
- )
- Drift Detection Techniques
- Kolmogorov-Smirnov Test
-
- Statistical test for distribution change
- Population Stability Index
-
- Measures feature distribution shifts
- Chi-square Test
-
- Categorical feature drift
- Wasserstein Distance
-
- Optimal transport-based distance
- Jensen-Shannon Divergence
-
- Information-theoretic metric
- Alert Thresholds
- Critical
-
- >5% accuracy drop, immediate action
- High
-
- Data drift in 3+ features, investigation needed
- Medium
-
- Output drift, monitoring increased
- Low
- Single feature drift, log and track Deliverables Monitoring dashboards Alert configuration Drift detection reports Performance degradation analysis Retraining recommendations Action playbook